package com.czxy

import java.util.Properties

import com.czxy.bean.{HBaseMeta, TagRule}
import org.apache.spark.sql.{DataFrame, Dataset, Row, SparkSession}

object Gender {


  def main(args: Array[String]): Unit = {
    //创建sparksql对象
    val spark: SparkSession = SparkSession.builder().master("local[*]").appName("genderTag").getOrCreate()

    import spark.implicits._
    //链接mysql读取数据
    val prop = new Properties()
    prop.setProperty("user", "root")
    prop.setProperty("password", "123456")

    val mysqlDataF: DataFrame = spark.read.jdbc("jdbc:mysql://bd001:3306/tags_new", "tbl_basic_tag", prop)

    val fourTags = mysqlDataF.select("id", "rule").where("id=65")


    //读取rule字段
    val KvMap: Map[String, String] = fourTags.map(row => {
      //inType=HBase##zkHosts=192.168.10.20##zkPort=2181##hbaseTable=tbl_users##family=detail##selectFields=id,gender
      row.getAs("rule").toString.split("##")
        //inType=HBase
        //zkHosts=192.168.10.20
        //zkPort=2181
        .map(arrs => {
        val strings: Array[String] = arrs.split("=")
        (strings(0), strings(1))
      })
    }).collectAsList().get(0).toMap


    val hbaseMeta = toHBaseMeta(KvMap)

    //读取mysql数据库中方的五级标签
    val fiveTagData: Dataset[Row] = mysqlDataF.select('id, 'rule).where("pid=65")


    //将util.List转换成list   需要隐式转换
    import scala.collection.JavaConverters._
    //封装成tagRule
    val GenderRule: List[TagRule] = fiveTagData.map(row => {
      TagRule(row.getAs("id").toString.toInt, row.getAs("rule").toString)
    }).collectAsList().asScala.toList

    //GenderRule ==> List(TagRule(66,1), TagRule(67,2))


    //5  根据mysql数据中的四级标签的规则   读取hbase 数据
    //若使用hbase 客户端读取效率较慢，将hbase作为数据源，读取效率较快

    //inType=HBase
    // zkHosts=192.168.10.20
    // zkPort=2181
    // hbaseTable=tbl_users
    // family=detail
    // selectFields=id,gender
    val hbaseDatas: DataFrame = spark.read.format("com.czxy.tools.HBaseDataSource")
      // hbaseMeta.zkHosts就是 192.168.10.20
      .option("zkHosts", hbaseMeta.zkHosts)
      .option(HBaseMeta.ZKPORT, hbaseMeta.zkPort)
      .option(HBaseMeta.HBASETABLE, hbaseMeta.hbaseTable)
      .option(HBaseMeta.FAMILY, hbaseMeta.family)
      .option(HBaseMeta.SELECTFIELDS, hbaseMeta.selectFields)
      .load()

    //引入sparkSQL的内置函数
    import org.apache.spark.sql.functions._

    //编写udf函数  输入是1，2  返回值82 ，83
    var GetTagId = udf((gender: String) => {
      //设置标签默认值0
      var id = 0
      //遍历五级标签
      for (ruleOb <- GenderRule) {
        //当用户数据的gender与五及标签的rulei相等，那么返回五及标签的id
        if (gender == ruleOb.rule) {
          id = ruleOb.id
        }
      }
      id
    })

    val userTags: DataFrame = hbaseDatas.select('id as("userId"), GetTagId('gender).as("tagsId"))

    userTags.show(20)

    userTags.write.format("com.czxy.tools.HBaseDataSource")
      .option("zkHosts", hbaseMeta.zkHosts)
      .option(HBaseMeta.ZKPORT, hbaseMeta.zkPort)
      .option(HBaseMeta.HBASETABLE, "test")
      .option(HBaseMeta.FAMILY, hbaseMeta.family)
      .option(HBaseMeta.SELECTFIELDS, "userId,tagsId")
      .save()


  }


  def toHBaseMeta(data: Map[String, String]) = {
    HBaseMeta(data.getOrElse("inType", ""),
      data.getOrElse("zkHosts", ""),
      data.getOrElse("zkPort", ""),
      data.getOrElse("hbaseTable", ""),
      data.getOrElse("family", ""),
      data.getOrElse("selectFields", ""),
      data.getOrElse("rowKey", ""))
  }
}
